Skip to content

feat(sqs): wire partitioned-FIFO data plane through dispatch helpers (Phase 3.D PR 5b-2)#732

Open
bootjp wants to merge 2 commits intomainfrom
feat/sqs-htfifo-dispatch-wiring
Open

feat(sqs): wire partitioned-FIFO data plane through dispatch helpers (Phase 3.D PR 5b-2)#732
bootjp wants to merge 2 commits intomainfrom
feat/sqs-htfifo-dispatch-wiring

Conversation

@bootjp
Copy link
Copy Markdown
Owner

@bootjp bootjp commented May 2, 2026

Summary

Stage 2 of Phase 3.D PR 5b: wire SendMessage / ReceiveMessage / DeleteMessage / ChangeMessageVisibility through the per-key dispatch helpers landed in PR 5b-1 (#731). The §11 PR 2 dormancy gate still rejects PartitionCount > 1 at CreateQueue, so production queues stay on the legacy keyspace and observable behaviour is byte-identical. PR 5b-3 lifts the gate atomically with the capability check.

What's wired

Send paths

  • sendMessage, sendBatchStandardOnce: partition is a compile-time 0 (Standard queues reject PartitionCount > 1 via the cross-attribute validator). Dispatch helpers route to the legacy keyspace on the steady-state path.
  • sendFifoMessage: hashes MessageGroupId once via partitionFor and threads the result through every key construction (data, vis, byage, dedup, group-lock).

Receive fanout

  • scanAndDeliverOnce wraps the scan loop with a per-partition fanout over effectivePartitionCount(meta) iterations. The shared wall-clock + per-call Max budget caps apply across the whole call, not per-partition.
  • sqsMsgCandidate carries a new partition field stamped at scan time, so loadCandidateRecord / expireMessage / commitReceiveRotation / classifyFifoGroupLock all route to the partition the message was originally stored under.

Delete + ChangeMessageVisibility

  • handle.Partition from the v2 receipt handle drives every key construction.
  • loadMessageForDelete / loadAndVerifyMessage invoke the new validateReceiptHandleVersion once meta is in scope.

Receipt-handle dispatch (the contract shift)

  • encodeReceiptHandleDispatch(meta, partition, ...) is the single point that picks v1 vs v2.
  • decodeClientReceiptHandle is now version-agnostic; the queue-aware version check moved to validateReceiptHandleVersion. v1-on-partitioned and v2-on-non-partitioned both surface as ReceiptHandleIsInvalid, preserving the dormancy promise (no v2 wire-format probability from the public API) under the new contract. Since the §11 PR 2 dormancy gate keeps every production queue non-partitioned in this PR, every v2 handle still surfaces as ReceiptHandleIsInvalid downstream — exactly the PR 5a observable behaviour.

Reaper

The reaper iterates legacy byAge keys only — partition-iterating enumeration ships in the later partition-reaper PR. buildReapOps / reapOneRecord pass nil meta + partition 0 through dispatch helpers so the keys are byte-identical to the pre-PR-5b layout.

What does NOT change yet

  • CreateQueue still rejects PartitionCount > 1 with InvalidAttributeValue (the §11 PR 2 dormancy gate, lifted atomically in PR 5b-3).
  • Production queues never enter the partitioned dispatch branches in this PR — observable behaviour is byte-identical.
  • The reaper does not yet enumerate partitioned byAge keys (a later PR).

Test plan

10 new tests across the contract surface.

Unit tests (adapter/sqs_keys_dispatch_v2_test.go):

  • TestEncodeReceiptHandleDispatch_PicksVersionByPartitionCount — pins the version dispatch decision across nil / 0 / 1 / 4 metas.
  • TestEncodeReceiptHandleDispatch_LegacyByteIdenticalToV1 — protects the byte-identical guarantee on legacy queues.
  • TestEncodeReceiptHandleDispatch_PerQueueUsesV2perQueue + PartitionCount=4 still produces v2 handles (keyspace is partitioned even when partitionFor collapses every group to partition 0).
  • TestValidateReceiptHandleVersion_QueueAwareRules — 10 sub-cases covering the (meta.PartitionCount × handle.Version) matrix, including perQueue + PartitionCount=4.
  • TestValidateReceiptHandleVersion_NilHandle — defensive nil branch.
  • TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned — named regression for the dormancy guarantee under the new contract.
  • TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy — byte-identical to legacy sqsMsgVisScanBounds on legacy meta.
  • TestSQSMsgVisScanBoundsDispatch_PartitionedUsesPartitionedPrefix — different partitions yield disjoint scan ranges.
  • TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace — pins the PR feat(sqs): per-key dispatch helpers for partitioned-FIFO routing (Phase 3.D PR 5b-1) #731 round 2 forward-note invariant: perQueue + PartitionCount=4 keeps the partitioned vis prefix at partition 0; collapsing to legacy would silently strand send writes.

Integration tests (adapter/sqs_partitioned_dispatch_test.go) install a partitioned meta directly on a queue created via the public API, short-circuiting the dormancy gate without disabling it for production CreateQueue:

  • TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip — end-to-end smoke test: send 6 groups, receive surfaces all via fanout, every handle is v2, delete via v2 handle, queue is empty afterwards, legacy keyspace stays empty.
  • TestSQSServer_PartitionedFIFO_RejectsV1Handle — forged v1 handle on a partitioned queue surfaces as ReceiptHandleIsInvalid via DeleteMessage and ChangeMessageVisibility.
  • TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZeroperQueue + PartitionCount=4 receive surfaces every message in one fanout pass; every v2 handle records Partition=0.

Updated TestDecodeClientReceiptHandle_RejectsV2TestDecodeClientReceiptHandle_AcceptsV2 to reflect the contract shift (rejection moved from API boundary to meta-aware validateReceiptHandleVersion).

  • go test -race ./adapter/... (targeted SQS scope) clean.
  • golangci-lint run ./adapter/... clean.

Self-review (per CLAUDE.md)

  1. Data loss — Dispatch helpers byte-identical on legacy queues (PartitionCount<=1 routes to legacy constructors). DLQ FIFO computes dlqPartition via partitionFor(dlqMeta, srcRec.MessageGroupId) so cross-queue redrive lands in the right partition. Receive fanout scans every partition. No issue.
  2. Concurrency — Receive fanout iterates partitions sequentially under one shared wall-clock + max budget, so the original per-call SLA is preserved. meta is loaded once and passed by pointer through helpers; no concurrent meta refresh in a single call. validateReceiptHandleVersion runs after the gen check so a SetQueueAttributes race cannot flip the answer (PartitionCount is immutable). go test -race clean.
  3. Performance — Hot path adds 1 nil-check + 1 PartitionCount compare per dispatch. Receive fanout iterates effectivePartitionCount(meta) times: 1 on legacy / perQueue, N on perMessageGroupId with N partitions splitting roughly 1/N of the messages each.
  4. Data consistency — Receipt handle's recorded partition matches the storage partition (commitReceiveRotation uses cand.partition for both newVisKey and the handle). Cross-version handles rejected as ReceiptHandleIsInvalid — no leak between keyspaces. perQueue + PartitionCount=4 invariant pinned by both unit and integration tests.
  5. Test coverage — 10 new tests across the contract surface. Existing legacy-path SQS tests pass unchanged.

Caller audit (semantic-change discipline)

decodeClientReceiptHandle semantics changed from "reject all v2" to "decode any version, defer version validation to meta-aware caller". All 3 production callers audited via grep:

  • parseQueueAndReceipt (sqs_messages.go:1576) → deleteMessageWithRetryloadMessageForDeletevalidateReceiptHandleVersion after meta load. ✅
  • DeleteMessageBatch entry (sqs_messages_batch.go:475) → deleteMessageWithRetry → same as above. ✅
  • ChangeMessageVisibilityBatch entry (sqs_messages_batch.go:575) → changeVisibilityWithRetryloadAndVerifyMessagevalidateReceiptHandleVersion. ✅

The dormancy guarantee (v2 → ReceiptHandleIsInvalid on non-partitioned) is preserved across every caller path.

Follow-ups

  • PR 5b-3 (next): atomic gate-lift + CreateQueue capability check via PollSQSHTFIFOCapability (feat(sqs): htfifo capability poller (Phase 3.D PR 4-B-3a) #721). Removes the validatePartitionDormancyGate call and verifies cluster-wide HT-FIFO support before accepting PartitionCount > 1.
  • PR 6: partition-iterating reaper (drops the "legacy byAge only" caveat in buildReapOps).

Summary by CodeRabbit

  • New Features

    • Added partition-aware message dispatch for improved FIFO queue handling.
    • Introduced receipt handle v2 format support for partitioned queues.
  • Bug Fixes

    • Enhanced receipt handle validation for queue-aware versioning.
    • Improved message routing and deletion for partitioned configurations.
  • Tests

    • Added comprehensive test coverage for partitioned queue operations and round-trip message delivery.

…(Phase 3.D PR 5b-2)

Stage 2 of PR 5b: wire SendMessage / ReceiveMessage / DeleteMessage
/ ChangeMessageVisibility through the per-key dispatch helpers
landed in PR 5b-1. The section 11 PR 2 dormancy gate still rejects
PartitionCount > 1 at CreateQueue, so production queues remain on
the legacy keyspace and observable behaviour is byte-identical.
PR 5b-3 lifts the gate atomically with the capability check.

Wiring

  - SendMessage (sendMessage, sendBatchStandardOnce): partition is
    a compile-time 0 (Standard queues reject PartitionCount > 1
    via the cross-attribute validator). Dispatch helpers route to
    legacy on the steady-state path.
  - sendFifoMessage: hashes MessageGroupId once via partitionFor
    and threads the result through every key construction (data,
    vis, byage, dedup, group-lock).
  - ReceiveMessage: scanAndDeliverOnce wraps the scan loop with a
    per-partition fanout over effectivePartitionCount(meta)
    iterations. The shared wall-clock + per-call max budget caps
    apply across the whole call, not per-partition. sqsMsgCandidate
    carries the partition field stamped at scan time so downstream
    rotate / delete / expire helpers route to the right partition.
  - DeleteMessage / ChangeMessageVisibility: handle.Partition from
    the v2 receipt handle drives every key construction.

Receipt-handle dispatch

  encodeReceiptHandleDispatch(meta, partition, ...) is the single
  point that picks v1 vs v2. ReceiveMessage on a partitioned queue
  records cand.partition into the handle so DeleteMessage routes
  back to the same partition.

  decodeClientReceiptHandle is now version-agnostic; the queue-
  aware version check moved to validateReceiptHandleVersion which
  runs in loadMessageForDelete / loadAndVerifyMessage once meta is
  in scope. v1 on a partitioned queue and v2 on a non-partitioned
  queue both surface as ReceiptHandleIsInvalid -- preserving the
  dormancy promise (no v2 wire-format probability from the public
  API) under the new contract. The section 11 PR 2 dormancy gate
  still in force in this PR keeps every queue non-partitioned, so
  v2 handles still surface as ReceiptHandleIsInvalid downstream,
  the exact PR 5a observable behaviour.

Reaper

  Reaper iterates legacy byAge keys only -- partition-iterating
  enumeration ships in the later partition reaper PR. buildReapOps
  / reapOneRecord pass nil meta + partition 0 through dispatch
  helpers so the keys are byte-identical to the pre-PR-5b layout.

Tests (10 added)

  Unit tests (sqs_keys_dispatch_v2_test.go):
    - TestEncodeReceiptHandleDispatch_PicksVersionByPartitionCount:
      pins the version dispatch decision across nil/0/1/4 metas.
    - TestEncodeReceiptHandleDispatch_LegacyByteIdenticalToV1:
      protects the byte-identical guarantee on legacy queues.
    - TestEncodeReceiptHandleDispatch_PerQueueUsesV2: pins that
      perQueue + PartitionCount=4 still produces v2 handles
      (keyspace is partitioned even when partitionFor collapses
      every group to partition 0).
    - TestValidateReceiptHandleVersion_QueueAwareRules: 10
      sub-cases covering the (meta.PartitionCount x handle.Version)
      matrix, including perQueue + PartitionCount=4.
    - TestValidateReceiptHandleVersion_NilHandle: defensive nil.
    - TestValidateReceiptHandleVersion_RejectsV2OnNonPartitioned:
      named regression for the dormancy guarantee under the new
      contract.
    - TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy:
      byte-identical to legacy sqsMsgVisScanBounds on legacy meta.
    - TestSQSMsgVisScanBoundsDispatch_PartitionedUsesPartitionedPrefix:
      different partitions yield disjoint scan ranges.
    - TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace:
      pins the PR 731 round 2 forward-note invariant -- perQueue
      + PartitionCount=4 keeps the partitioned vis prefix at
      partition 0; collapsing to legacy would silently strand
      send writes.

  Integration tests (sqs_partitioned_dispatch_test.go) install a
  partitioned meta directly on a queue created via the public API,
  short-circuiting the dormancy gate without disabling it for
  production CreateQueue:
    - TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip:
      end-to-end smoke test (send 6 groups, receive surfaces all
      via fanout, every handle is v2, delete via v2 handle, queue
      is empty afterwards, legacy keyspace stays empty).
    - TestSQSServer_PartitionedFIFO_RejectsV1Handle: forged v1
      handle on a partitioned queue surfaces as ReceiptHandleIsInvalid
      via DeleteMessage and ChangeMessageVisibility.
    - TestSQSServer_PartitionedFIFO_PerQueueCollapsesToPartitionZero:
      perQueue + PartitionCount=4 receive surfaces every message
      in one fanout pass; every v2 handle records Partition=0.

  Updated TestDecodeClientReceiptHandle_RejectsV2 to
  TestDecodeClientReceiptHandle_AcceptsV2 to reflect the contract
  shift (rejection moved from API boundary to meta-aware
  validateReceiptHandleVersion).

Self-review (per CLAUDE.md)

1. Data loss -- Dispatch helpers byte-identical on legacy queues
   (PartitionCount<=1 routes to legacy constructors). DLQ FIFO
   computes dlqPartition via partitionFor(dlqMeta, srcRec.MessageGroupId)
   so cross-queue redrive lands in the right partition. Receive
   fanout scans every partition. No issue.
2. Concurrency -- Receive fanout iterates partitions sequentially
   under one shared wall-clock + max budget, so the original
   per-call SLA is preserved. meta is loaded once and passed by
   pointer through helpers; no concurrent meta refresh in a
   single call. validateReceiptHandleVersion runs after the gen
   check so a SetQueueAttributes race cannot flip the answer
   (PartitionCount is immutable). go test -race clean.
3. Performance -- Hot path adds 1 nil-check + 1 PartitionCount
   compare per dispatch. Receive fanout iterates effectivePartitionCount(meta)
   times: 1 on legacy / perQueue, N on perMessageGroupId, with
   N partitions splitting roughly 1/N of the messages each.
4. Data consistency -- Receipt handle's recorded partition matches
   the storage partition (commitReceiveRotation uses cand.partition
   for both newVisKey and the handle). Cross-version handles
   rejected as ReceiptHandleIsInvalid -- no leak between keyspaces.
   perQueue + PartitionCount=4 invariant pinned by both unit and
   integration tests.
5. Test coverage -- 10 new tests across the contract surface.
   Existing legacy-path SQS tests pass unchanged.

go test -race ./adapter/... and golangci-lint clean.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 2, 2026

📝 Walkthrough

Walkthrough

This PR introduces partition-aware dispatch mechanisms for SQS message keys and receipt-handle encoding. Core dispatch helpers (sqsMsgDataKeyDispatch, sqsMsgVisKeyDispatch, etc.) are wired into FIFO send/receive, standard batch operations, deletion, visibility updates, redrive, and reaper flows to route and encode messages by partition. Partition-stamped message candidates enable downstream operations to use correct per-partition keys. V2 receipt-handle format and queue-aware version validation are added, with legacy non-partitioned queues (partition count ≤1) remaining on v1.

Changes

Partition-Aware SQS Dispatch

Layer / File(s) Summary
Dispatch Infrastructure
adapter/sqs_keys_dispatch.go, adapter/sqs_keys_dispatch_v2_test.go
Introduces sqsMsgVisScanBoundsDispatch, encodeReceiptHandleDispatch, and validateReceiptHandleVersion to centralize partition-aware key encoding, visibility scanning, and receipt-handle version enforcement (v2 for partitioned queues, v1 for non-partitioned).
FIFO Send & Dedup
adapter/sqs_fifo.go
loadFifoDedupRecord and loadFifoGroupLock now accept meta/partition and use dispatch key builders. sendFifoMessage computes partition via partitionFor(meta, MessageGroupId) and constructs all transaction keys (data/visibility/by-age) using dispatch variants.
Message Routing & Candidate Partitioning
adapter/sqs_messages.go
decodeClientReceiptHandle simplified to defer version validation; scanAndDeliverOnce adds partition fanout with rotated starting partition based on readTS. sqsMsgCandidate gains partition field; visibility scanning and candidate loading use dispatch-based key construction. tryDeliverCandidate refactored to accept meta and route via partition-aware group-lock and receipt-handle encoding.
Standard Queue Batches & Deletion
adapter/sqs_messages_batch.go, adapter/sqs_messages.go (delete paths)
Standard sendMessageBatch and delete/visibility operations use partition=0 dispatch key builders. deleteMessageWithRetry and loadMessageForDelete validate receipt-handle version and construct partitioned keys. changeVisibilityWithRetry routes old/new visibility keys via dispatch.
Redrive & Reaper
adapter/sqs_redrive.go, adapter/sqs_reaper.go
redriveCandidateToDLQ now accepts srcMeta instead of srcGen; buildRedriveOps derives source partition from group ID and uses partition-dispatch for DLQ key construction. reapOneRecord and buildReapOps adopt partition-dispatch key builders with hardcoded partition=0 for legacy compatibility.
Integration & End-to-End Tests
adapter/sqs_partitioned_dispatch_test.go, adapter/sqs_receipt_handle_v2_test.go
Round-trip tests verify partitioned FIFO send/receive/delete with v2 handles and partition ids; v1 handle rejection on partitioned queues; per-queue throughput collapse to partition 0; and partition offset rotation. decodeClientReceiptHandle now accepts (not rejects) v2 handles, with downstream validation.

Sequence Diagram

sequenceDiagram
    participant Client
    participant SQS Server
    participant Partitioner
    participant Visibility Store
    participant Data Store

    rect rgba(100, 150, 200, 0.5)
    Note over Client,Data Store: Partitioned FIFO Send
    Client->>SQS Server: SendMessage(MessageGroupId="group-A")
    SQS Server->>Partitioner: partitionFor(meta, "group-A")
    Partitioner-->>SQS Server: partition = 2
    SQS Server->>Data Store: Write(sqsMsgDataKeyDispatch(..., partition=2, ...))
    SQS Server->>Visibility Store: Write(sqsMsgVisKeyDispatch(..., partition=2, ...))
    SQS Server->>SQS Server: encodeReceiptHandleDispatch(..., partition=2, ...) → v2 handle
    SQS Server-->>Client: SendMessageResponse(ReceiptHandle=v2)
    end

    rect rgba(200, 100, 150, 0.5)
    Note over Client,Data Store: Partitioned Fanout Receive
    Client->>SQS Server: ReceiveMessage(MaxMessages=10)
    SQS Server->>SQS Server: readTS → startPartitionOffset → partition[0,1,2,3]
    loop For Each Partition
        SQS Server->>Visibility Store: Scan(sqsMsgVisScanBoundsDispatch(..., partition, ...))
        Visibility Store-->>SQS Server: Candidates{partition, visKey, messageID}
    end
    SQS Server->>Data Store: Load(sqsMsgDataKeyDispatch(..., cand.partition, ...))
    SQS Server->>SQS Server: encodeReceiptHandleDispatch(..., cand.partition, ...) → v2 handle
    SQS Server-->>Client: ReceiveMessageResponse(Messages=[...], ReceiptHandles=[v2])
    end

    rect rgba(150, 200, 100, 0.5)
    Note over Client,Data Store: Partition-Aware Delete
    Client->>SQS Server: DeleteMessage(ReceiptHandle=v2)
    SQS Server->>SQS Server: validateReceiptHandleVersion(meta, v2_handle)
    SQS Server->>Data Store: Delete(sqsMsgDataKeyDispatch(..., v2_handle.partition, ...))
    SQS Server->>Visibility Store: Delete(sqsMsgVisKeyDispatch(..., v2_handle.partition, ...))
    SQS Server-->>Client: DeleteMessageResponse()
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 Partitions fan out across the keyspace wide,
Each message grouped with grace, no more to hide!
Dispatch helpers route with wisdom true,
V2 handles track which shard is due.
From send to delete, each path's refined—
Rabbit-fast delivery, partition-aligned! 🚀

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately reflects the main change: wiring partitioned-FIFO data plane through dispatch helpers. It's specific, concise, and clearly summarizes the primary technical contribution.
Docstring Coverage ✅ Passed Docstring coverage is 92.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/sqs-htfifo-dispatch-wiring

Tip

💬 Introducing [Slack Agent](https://www.coderabbit.ai/agent): Turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get your free trial and get 200 agent minutes per Slack user (a $50 value).


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 60 minutes.

Comment @coderabbitai help to get the list of available commands and usage tips.

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 2, 2026

@claude review

@claude
Copy link
Copy Markdown

claude Bot commented May 2, 2026

Claude finished @bootjp's task in 8m 28s —— View job


Code Review: feat(sqs): wire partitioned-FIFO data plane through dispatch helpers (Phase 3.D PR 5b-2)

  • Gather PR context and understand the request
  • Read changed files (9 adapter files, ~900 additions / ~160 deletions)
  • Analyze dispatch helpers and wiring logic
  • Review test coverage
  • Check for data loss, concurrency, and consistency issues
  • Post complete review

Overall assessment

This is a well-structured, carefully staged PR. The core abstraction — a single dispatch layer (sqs_keys_dispatch.go) that routes to legacy or partitioned key constructors based on meta.PartitionCount — is the right shape, and the dormancy guarantee (no observable change on production queues) holds cleanly. The self-review in the PR description is thorough and accurate on all five CLAUDE.md lenses. There is one medium-severity finding and a few minor notes.


Medium: validateReceiptHandleVersion does not bound-check handle.Partition against meta.PartitionCount

File: adapter/sqs_keys_dispatch.go:165–196

validateReceiptHandleVersion verifies that the handle's version matches the queue's partition mode, but it does not verify that the partition index in a v2 handle is within [0, meta.PartitionCount).

A client that receives a legitimate v2 handle (partition = N) could forge a new v2 handle for the same queue with an out-of-range partition (e.g., partition = 63 on a 4-partition queue). After passing the version check, loadMessageForDelete constructs a key for partition 63, gets ErrKeyNotFound, and returns sqsDeleteNoOp — silently succeeding. This is AWS-idempotent but slightly incorrect semantics for a forged handle. More importantly, it leaks structural information: the client can confirm partition-key-space boundaries exist at that queue by observing the silent success vs. an explicit invalid-handle error.

// Suggested addition in validateReceiptHandleVersion, after the version checks:
if meta != nil && meta.PartitionCount > 1 && handle.Partition >= meta.PartitionCount {
    return errors.New("receipt handle partition out of range")
}

Since PartitionCount is immutable after queue creation (enforced by validatePartitionImmutability), this check is race-safe. The dormancy gate in PR 5b-2 means this can't be triggered today, but it should be in place before the gate is lifted in PR 5b-3.

Fix this →


Minor observations

1. now is computed per-partition in scanAndDeliverPartition (sqs_messages.go:901)

Before this PR, now was computed once in scanAndDeliverOnce. After, it is computed fresh at the start of each scanAndDeliverPartition call, meaning a message that crosses its visibility threshold between partitions 0 and 1 during a fanout receive might be delivered in partition 1's scan but would not have been found in a single-partition equivalent. This is strictly a behavior improvement (AWS does not guarantee anything stronger than eventual delivery anyway), but it is a subtle semantic delta worth noting in the PR description for future readers.

2. Integration test's legacy-keyspace probe end key uses 4 bytes of 0xFF (sqs_partitioned_dispatch_test.go:184)

end = append(end, 0xFF, 0xFF, 0xFF, 0xFF)

The actual message-ID suffix in the key is a base64url-encoded 16-byte UUID (~22 chars). The 4-byte 0xFF suffix works correctly because all base64url characters (A-Z, a-z, 0-9, -, _) have byte values < 0xFF, so any base64url string of any length compares less than [0xFF 0xFF 0xFF 0xFF]. However, using prefixScanEnd(legacyDataPrefix) (already available in the codebase at adapter/prefix_scan.go) would make the intent explicit and match the pattern used in the rest of the reaper code. Not a bug, just a consistency nit.

3. sqsMsgVisScanBoundsDispatch duplicates the scan-bounds logic from sqsMsgVisScanBounds (sqs_keys_dispatch.go:128–141)

The two functions share identical logic (construct prefix, zero-pad start, increment upper bound for end). A minor refactor opportunity: sqsMsgVisScanBoundsDispatch could delegate to sqsMsgVisScanBounds on the legacy branch rather than re-implementing the bounds math inline. The test TestSQSMsgVisScanBoundsDispatch_LegacyMatchesLegacy catches any divergence, so this is low risk either way.


Correctness verification (key paths)

Path Partition source Consistent with send?
sendFifoMessage key construction partitionFor(meta, in.MessageGroupId)
commitReceiveRotation vis-key + handle cand.partition (stamped at scan)
buildDeleteOps vis/byage/group-lock keys handle.Partition (v2 handle)
changeVisibilityWithRetry old/new vis key handle.Partition (v2 handle)
buildRedriveOps DLQ keys partitionFor(dlqMeta, dlqRec.MessageGroupId)
Reaper (buildReapOps) nil meta + partition 0 → legacy only ✅ (intentional, PR 6 will extend)
validateReceiptHandleVersion version-only check ⚠️ partition bounds not checked

Snapshot consistency: readTS is computed once in scanAndDeliverOnce and threaded identically through all partition fanout calls — consistent multi-partition snapshot. ✅

Group-lock partition coherence: partitionFor is deterministic (FNV-1a, power-of-two mask); a given MessageGroupId maps to exactly one partition for the lifetime of a queue generation. The group lock is always written and read at that same partition. ✅

PartitionCount immutability: validatePartitionImmutability is enforced on SetQueueAttributes, making the meta-dispatch pattern race-safe without re-loading meta inside OCC transactions. ✅

Dormancy preservation: Every code path that could emit a v2 handle is gated on meta.PartitionCount > 1, which CreateQueue continues to reject via validatePartitionDormancyGate until PR 5b-3. ✅


Test coverage

10 new tests are well-targeted. Particular highlights:

  • TestSQSMsgVisScanBoundsDispatch_PerQueueOnPartitionedKeyspace directly pins the forward-note invariant from PR 731 round 2 (perQueue + partitioned keyspace must scan partitioned prefix, not legacy).
  • TestSQSServer_PartitionedFIFO_SendReceiveDeleteRoundTrip sanity-checks the legacy keyspace is empty after all operations, which is the cleanest possible regression gate for future dispatch regressions.
  • TestDecodeClientReceiptHandle_AcceptsV2 correctly documents the contract shift.

Missing: a test that a v2 handle with Partition >= meta.PartitionCount is rejected (tracking the medium finding above).


Verdict: Ready to merge after the partition bounds check is added to validateReceiptHandleVersion and covered by a test. The rest of the implementation is sound.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements the data plane wiring for partitioned FIFO queues. It introduces dispatch helpers to route key construction, receipt handle encoding, and version validation based on the queue's partition count. Updates were made to message sending, receiving, deletion, and redriving logic to support these partitions. A review comment pointed out an improvement opportunity in the integration tests to use a more robust helper for calculating prefix scan bounds instead of manually appending bytes.

Comment on lines +181 to +184
// Cap the prefix scan at the generation byte so we do not
// drag in unrelated queues.
end := append([]byte(nil), legacyDataPrefix...)
end = append(end, 0xFF, 0xFF, 0xFF, 0xFF)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for creating the end key for the prefix scan is incorrect. Appending 0xFF bytes does not correctly create an exclusive upper bound for all cases, especially when the prefix itself contains 0xFF bytes, which could cause the scan to read into adjacent key ranges (e.g., a different queue generation). This could lead to test flakiness or incorrect test behavior.

A more robust way to create an end key for a prefix is to find the next lexicographical key after the prefix. The codebase already has a helper for this, prefixScanEnd. You should use it here for correctness and consistency.

Suggested change
// Cap the prefix scan at the generation byte so we do not
// drag in unrelated queues.
end := append([]byte(nil), legacyDataPrefix...)
end = append(end, 0xFF, 0xFF, 0xFF, 0xFF)
// The end key for a prefix scan is the smallest key that does not
// have the prefix. This correctly bounds the scan to only keys
// within the given queue and generation.
end := prefixScanEnd(legacyDataPrefix)

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 443ceb5f9c

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_messages.go Outdated
Comment on lines +873 to +875
for partition := uint32(0); partition < partitions; partition++ {
if len(delivered) >= opts.Max {
break
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid fixed partition-0 priority in receive fanout

When PartitionCount > 1, this loop always starts at partition 0 and exits as soon as opts.Max is filled, so repeated ReceiveMessage calls can permanently starve higher-index partitions whenever partition 0 has a sustained backlog. This only appears on partitioned queues, but once enabled it can make some message groups effectively undeliverable under load because later partitions are never scanned.

Useful? React with 👍 / 👎.

return errors.New("receipt handle version mismatch for non-partitioned queue")
}
return nil
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject out-of-range v2 receipt-handle partitions

The queue-aware version check now accepts any v2 handle on partitioned queues but never validates handle.Partition < meta.PartitionCount. A forged v2 handle with an out-of-range partition is then used to build partitioned keys in delete/change-visibility paths; the partition resolver fails closed for unknown partitions, which bubbles up as a generic internal error instead of ReceiptHandleIsInvalid. This is reachable once partitioned queues are enabled.

Useful? React with 👍 / 👎.

…und 1)

Codex P1: with the fanout always starting at partition 0, a sustained
backlog on partition 0 would fill opts.Max before the loop reached
partition 1 — messages in higher-index partitions were never observed
under load. Rotate the starting partition by readTS so consecutive
receives spread across every partition.

Helper startPartitionOffset folds the upper and lower 32 bits of the
HLC-derived readTS and mask-ANDs by partitions-1 (PartitionCount is
power-of-two by validator invariant). The byte-slice fold preserves
entropy from both HLC halves while keeping the conversion in uint32
space — no //nolint needed for gosec G115.

FIFO ordering is unaffected: partitionFor is deterministic, so a
MessageGroupId always lands in one partition and cross-partition
iteration order does not reorder messages within any group.

Caller audit: scanAndDeliverOnce / scanAndDeliverPartition signatures
and error semantics are unchanged; only the internal iteration order
rotates. New helper has only the one in-package caller.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented May 2, 2026

@claude review

Round 1 addressing Codex P1: rotate the receive-fanout starting partition by readTS so a sustained backlog on a single partition cannot starve higher-index partitions.

Changes:

  • adapter/sqs_messages.go — new startPartitionOffset(partitions, readTS) helper. scanAndDeliverOnce now iterates (startOffset + i) % partitions. Implementation folds the upper and lower 32 bits of the HLC-derived readTS and mask-ANDs by partitions - 1 (power-of-two by validator invariant), keeping the computation in uint32 space (gosec G115 clean, no //nolint).
  • adapter/sqs_partitioned_dispatch_test.goTestStartPartitionOffset_RotatesByReadTS pins: legacy/perQueue → 0, every partition appears as a starting point across consecutive readTS, and the output is always in [0, partitions).

FIFO ordering: unaffected. partitionFor is deterministic, so a MessageGroupId always lands in one partition; cross-partition iteration order never reorders messages within any group.

Caller audit: scanAndDeliverOnce and scanAndDeliverPartition signatures and error semantics are unchanged — only the internal iteration order rotates. The new helper has one in-package caller.

P2/medium findings (out-of-range v2 partition validation, prefix-scan helper in test) are intentionally deferred — below the loop's P0/P1/high/critical/major threshold.

@claude
Copy link
Copy Markdown

claude Bot commented May 2, 2026

Claude finished @bootjp's task in 2m 22s —— View job


Code Review: Round 1 — Partition-starvation fix (startPartitionOffset)

  • Gather PR context and understand the request
  • Read adapter/sqs_messages.gostartPartitionOffset helper + fanout loop change
  • Read adapter/sqs_partitioned_dispatch_test.goTestStartPartitionOffset_RotatesByReadTS
  • Review rotation logic correctness (gosec G115, power-of-two invariant)
  • Review FIFO ordering preservation
  • Review test coverage and edge cases
  • Post complete review

Overall assessment

The fix is correct and well-scoped. The P1 finding (partition 0 starvation under sustained backlog) is addressed cleanly without touching any signatures or error semantics. Implementation and test match the stated intent. A few minor observations follow.


Correctness: startPartitionOffset

func startPartitionOffset(partitions uint32, readTS uint64) uint32 {
    if partitions <= 1 {
        return 0
    }
    var buf [8]byte
    binary.BigEndian.PutUint64(buf[:], readTS)
    folded := binary.BigEndian.Uint32(buf[0:4]) ^ binary.BigEndian.Uint32(buf[4:8])
    return folded & (partitions - 1)
}

Power-of-two invariantPartitionCount is power-of-two enforced by htfifoValidatePartitionCount; effectivePartitionCount also returns 1 (= 2⁰) for the perQueue collapse, so mask-AND is equivalent to modulo throughout. ✅

gosec G115 avoidance — the byte-slice fold sidesteps the explicit uint64 → uint32 narrowing that gosec would flag. The approach correctly mirrors partitionFor. ✅

Distribution for real HLC values — the HLC layout is (physical_ms << 16) | logical_counter. Splitting at the 32-bit boundary gives:

  • upper32 = physical_ms >> 16 (changes slowly, ~once per ms)
  • lower32 = (physical_ms & 0xFFFF) << 16 | logical_counter (low 2 bits driven by logical_counter)

For consecutive receives in the same millisecond, logical_counter increments by 1 per call, so the low bits of folded increment by 1 → sequential cycling through all N partitions. ✅ For cross-millisecond boundaries, both halves change, maintaining spread.

Overflow safety in the loopstartOffset + i < 2 * partitions; with htfifoMaxPartitions = 32, max sum is 62, well within uint32. % partitions is safe since effectivePartitionCount guarantees partitions >= 1. ✅


Minor observations

1. Test exercises only low ts values where upper 32 bits are zero (sqs_partitioned_dispatch_test.go:335–353)

The coverage loop runs ts = 1..64 and ts = 0..1023. With these values, upper32 = 0, so folded = lower32 = ts, and the offset cycles trivially: ts & 3 = 0,1,2,3,0,.... This is correct but it doesn't exercise the XOR fold with non-zero upper bits. Real HLC timestamps have physical_ms >> 16 ≈ 0x190xxx (non-zero). Adding a sub-case with an HLC-range seed (e.g., ts = uint64(1_735_000_000_000) << 16) would give more confidence that the fold doesn't accidentally collapse to a constant offset for real values. The fix itself is still sound — this is a test quality observation only.

2. Comment verbosity (sqs_messages.go:907–925, sqs_messages.go:872–882)

CLAUDE.md's convention is: "one short line max" for comments. The startPartitionOffset doc block is ~10 lines and the inline loop comment is ~11 lines. The gosec-G115 reasoning is genuinely non-obvious and worth keeping, but both blocks could be compressed significantly. Not blocking, but the inline loop comment in particular restates what's already in the function's doc.

3. Loop uses % where mask-AND would be consistent (sqs_messages.go:892)

partition := (startOffset + i) % partitions

startPartitionOffset uses & (partitions - 1) and explicitly documents the power-of-two invariant. Using & (partitions - 1) in the loop too would be consistent and self-documenting, avoiding the % operator whose correctness requires knowing that partitions is never 0 (which it isn't, but the reader has to check). Either form is correct.


FIFO ordering

Preserved. partitionFor is deterministic (FNV-1a over MessageGroupId), so a group always hashes to exactly one partition regardless of which partition the fanout starts from. Re-ordering scanAndDeliverPartition calls cannot reorder messages within any group because cross-partition messages are by definition different groups. ✅


Summary

The P1 fix is correct, gosec-clean, and backward-compatible on every queue mode. The three observations above are all minor or nit-level. The implementation is ready to merge.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 52c449f5b3

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread adapter/sqs_messages.go
Comment on lines +884 to +885
startOffset := startPartitionOffset(partitions, readTS)
for i := uint32(0); i < partitions; i++ {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Rotate receive start independently from commit-ts stride

Using readTS as the only rotation source can still starve partitions under sustained load, because readTS comes from the last committed timestamp and often advances by a fixed stride (roughly the number of successful per-message rotations in each receive call). With power-of-two partitions, masking this structured value can lock start offsets to a subset (e.g. 4 partitions + MaxNumberOfMessages=10 can alternate between two starts), so if those early partitions stay backlogged, later partitions are never scanned.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
adapter/sqs_partitioned_dispatch_test.go (1)

288-317: ⚡ Quick win

Pin the single-pass behavior instead of retrying receives.

This loop lets the test pass even if effectivePartitionCount never collapses to 1, because repeated receives can still drain partition 0 eventually. If the contract here is “one receive call must surface every message,” make that the assertion directly.

Suggested tightening
 	collected := make(map[string]bool, len(groups))
-	for range 4 {
-		if len(collected) == len(groups) {
-			break
-		}
-		status, out := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
-			"QueueUrl":            queueURL,
-			"MaxNumberOfMessages": 10,
-			"VisibilityTimeout":   60,
-		})
-		require.Equal(t, http.StatusOK, status, "receive: %v", out)
-		msgs, _ := out["Messages"].([]any)
-		for _, m := range msgs {
-			mm, _ := m.(map[string]any)
-			body, _ := mm["Body"].(string)
-			collected[body] = true
-			handle, _ := mm["ReceiptHandle"].(string)
-			parsed, err := decodeReceiptHandle(handle)
-			require.NoError(t, err)
-			require.Equal(t, sqsReceiptHandleVersion2, parsed.Version,
-				"perQueue + PartitionCount=4 must still produce v2 handles")
-			require.Equal(t, uint32(0), parsed.Partition,
-				"perQueue mode pins every group to partition 0, so every handle must record Partition=0")
-		}
-	}
+	status, out := callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
+		"QueueUrl":            queueURL,
+		"MaxNumberOfMessages": 10,
+		"VisibilityTimeout":   60,
+	})
+	require.Equal(t, http.StatusOK, status, "receive: %v", out)
+	msgs, _ := out["Messages"].([]any)
+	for _, m := range msgs {
+		mm, _ := m.(map[string]any)
+		body, _ := mm["Body"].(string)
+		collected[body] = true
+		handle, _ := mm["ReceiptHandle"].(string)
+		parsed, err := decodeReceiptHandle(handle)
+		require.NoError(t, err)
+		require.Equal(t, sqsReceiptHandleVersion2, parsed.Version,
+			"perQueue + PartitionCount=4 must still produce v2 handles")
+		require.Equal(t, uint32(0), parsed.Partition,
+			"perQueue mode pins every group to partition 0, so every handle must record Partition=0")
+	}
 	require.Len(t, collected, len(groups),
 		"perQueue receive must surface every message in one fanout pass over partition 0")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/sqs_partitioned_dispatch_test.go` around lines 288 - 317, The test
currently loops multiple receive calls which masks failures by eventually
draining partition 0; instead make a single receive call using
callSQS(sqsReceiveMessageTarget, ...) and assert that the response contains
exactly len(groups) Messages in that one call, then iterate those messages to
validate decodeReceiptHandle parsed.Version equals sqsReceiptHandleVersion2 and
parsed.Partition == 0; remove the retry loop and fail the test immediately if
the single receive doesn't return all messages so the contract “one receive call
must surface every message” is enforced.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@adapter/sqs_keys_dispatch.go`:
- Around line 182-195: In validateReceiptHandleVersion, add a bounds check for
v2 handles so that when meta != nil && meta.PartitionCount > 1 and
handle.Version == sqsReceiptHandleVersion2 you also verify handle.Partition <
meta.PartitionCount; if the partition is out of range return the
ReceiptHandleIsInvalid error (or an appropriate error indicating invalid receipt
handle) instead of allowing it to fall through to downstream routing; update the
function logic around validateReceiptHandleVersion to perform this check before
returning nil.

In `@adapter/sqs_partitioned_dispatch_test.go`:
- Around line 91-106: The test currently only asserts parsed.Partition is within
a range which can hide misrouting; instead, after populating sent (the map from
group -> MessageId) reverse that map to look up group by MessageId and assert
that for each received handle the decoded parsed.Partition equals
partitionFor(meta, group). Concretely: after sending messages with callSQS
(using MessageGroupId) and collecting sent[g]=msgID, when you decode handles
check the MessageId from the handle, find the original group via the reversed
sent map, then replace the loose parsed.Partition < 4 assertion with
require.Equal(t, partitionFor(meta, group), parsed.Partition) so each message is
asserted to land on its expected partition.
- Around line 160-191: The current post-delete check re-calls
callSQS(sqsReceiveMessageTarget) immediately and can miss undeleted but
invisible messages (earlier receives used VisibilityTimeout: 60), so a
regression that leaves records invisible would still pass; instead of relying on
immediate receive, either wait > VisibilityTimeout before re-receiving or
(preferred) directly assert the partitioned keyspace is empty by scanning the
partitioned data and visibility prefixes with node.sqsServer.store.ScanAt using
the queue generation from node.sqsServer.loadQueueMetaAt (you already use
sqsMsgDataKey for the legacy prefix—compute the analogous partitioned
data/visibility prefixes for this queue generation and assert ScanAt returns
zero entries), keeping the existing legacy-prefix check for completeness.

---

Nitpick comments:
In `@adapter/sqs_partitioned_dispatch_test.go`:
- Around line 288-317: The test currently loops multiple receive calls which
masks failures by eventually draining partition 0; instead make a single receive
call using callSQS(sqsReceiveMessageTarget, ...) and assert that the response
contains exactly len(groups) Messages in that one call, then iterate those
messages to validate decodeReceiptHandle parsed.Version equals
sqsReceiptHandleVersion2 and parsed.Partition == 0; remove the retry loop and
fail the test immediately if the single receive doesn't return all messages so
the contract “one receive call must surface every message” is enforced.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 467f55af-49a9-40c4-a7bc-c9b223133d6e

📥 Commits

Reviewing files that changed from the base of the PR and between c7ed1a1 and 52c449f.

📒 Files selected for processing (9)
  • adapter/sqs_fifo.go
  • adapter/sqs_keys_dispatch.go
  • adapter/sqs_keys_dispatch_v2_test.go
  • adapter/sqs_messages.go
  • adapter/sqs_messages_batch.go
  • adapter/sqs_partitioned_dispatch_test.go
  • adapter/sqs_reaper.go
  • adapter/sqs_receipt_handle_v2_test.go
  • adapter/sqs_redrive.go

Comment on lines +182 to +195
func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error {
if handle == nil {
return errors.New("receipt handle is nil")
}
if meta != nil && meta.PartitionCount > 1 {
if handle.Version != sqsReceiptHandleVersion2 {
return errors.New("receipt handle version mismatch for partitioned queue")
}
return nil
}
if handle.Version != sqsReceiptHandleVersion1 {
return errors.New("receipt handle version mismatch for non-partitioned queue")
}
return nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject out-of-range v2 partitions in the meta-aware validator.

handle.Partition is now client-controlled once decodeClientReceiptHandle accepts v2. This helper only checks the version, so Partition >= meta.PartitionCount falls through to sqsMsg*KeyDispatch and depends on downstream routing failure semantics instead of returning ReceiptHandleIsInvalid. This is the right choke point to bounds-check before PR 5b-3 lifts the gate.

Suggested fix
 func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error {
 	if handle == nil {
 		return errors.New("receipt handle is nil")
 	}
 	if meta != nil && meta.PartitionCount > 1 {
 		if handle.Version != sqsReceiptHandleVersion2 {
 			return errors.New("receipt handle version mismatch for partitioned queue")
 		}
+		if handle.Partition >= meta.PartitionCount {
+			return errors.New("receipt handle partition out of range for queue")
+		}
 		return nil
 	}
 	if handle.Version != sqsReceiptHandleVersion1 {
 		return errors.New("receipt handle version mismatch for non-partitioned queue")
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error {
if handle == nil {
return errors.New("receipt handle is nil")
}
if meta != nil && meta.PartitionCount > 1 {
if handle.Version != sqsReceiptHandleVersion2 {
return errors.New("receipt handle version mismatch for partitioned queue")
}
return nil
}
if handle.Version != sqsReceiptHandleVersion1 {
return errors.New("receipt handle version mismatch for non-partitioned queue")
}
return nil
func validateReceiptHandleVersion(meta *sqsQueueMeta, handle *decodedReceiptHandle) error {
if handle == nil {
return errors.New("receipt handle is nil")
}
if meta != nil && meta.PartitionCount > 1 {
if handle.Version != sqsReceiptHandleVersion2 {
return errors.New("receipt handle version mismatch for partitioned queue")
}
if handle.Partition >= meta.PartitionCount {
return errors.New("receipt handle partition out of range for queue")
}
return nil
}
if handle.Version != sqsReceiptHandleVersion1 {
return errors.New("receipt handle version mismatch for non-partitioned queue")
}
return nil
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/sqs_keys_dispatch.go` around lines 182 - 195, In
validateReceiptHandleVersion, add a bounds check for v2 handles so that when
meta != nil && meta.PartitionCount > 1 and handle.Version ==
sqsReceiptHandleVersion2 you also verify handle.Partition < meta.PartitionCount;
if the partition is out of range return the ReceiptHandleIsInvalid error (or an
appropriate error indicating invalid receipt handle) instead of allowing it to
fall through to downstream routing; update the function logic around
validateReceiptHandleVersion to perform this check before returning nil.

Comment on lines +91 to +106
groups := []string{"alpha", "beta", "gamma", "delta", "epsilon", "zeta"}
sent := make(map[string]string, len(groups)) // group -> messageID
for i, g := range groups {
body := "body-" + g
dedup := "dedup-" + g
status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{
"QueueUrl": queueURL,
"MessageBody": body,
"MessageGroupId": g,
"MessageDeduplicationId": dedup,
})
require.Equal(t, http.StatusOK, status,
"send #%d (group=%s): %v", i, g, out)
msgID, _ := out["MessageId"].(string)
require.NotEmpty(t, msgID, "send #%d: empty MessageId", i)
sent[g] = msgID
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Assert the expected partition, not just that it is in range.

parsed.Partition < 4 still passes if every MessageGroupId is accidentally routed to partition 0, so this round-trip would miss the main dispatch regression it is meant to catch. Since the test already records group -> MessageId, reverse that map and compare each decoded handle partition to partitionFor(meta, group).

Suggested tightening
 	groups := []string{"alpha", "beta", "gamma", "delta", "epsilon", "zeta"}
 	sent := make(map[string]string, len(groups)) // group -> messageID
+	byMessageID := make(map[string]string, len(groups)) // messageID -> group
 	for i, g := range groups {
 		body := "body-" + g
 		dedup := "dedup-" + g
 		status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{
@@
 		require.Equal(t, http.StatusOK, status,
 			"send #%d (group=%s): %v", i, g, out)
 		msgID, _ := out["MessageId"].(string)
 		require.NotEmpty(t, msgID, "send #%d: empty MessageId", i)
 		sent[g] = msgID
+		byMessageID[msgID] = g
 	}
+
+	ctx := context.Background()
+	readTS := node.sqsServer.nextTxnReadTS(ctx)
+	meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS)
+	require.NoError(t, err)
+	require.True(t, exists)

@@
 			require.Less(t, parsed.Partition, uint32(4),
 				"v2 handle partition out of range")
+			require.Equal(t,
+				partitionFor(meta, byMessageID[id]),
+				parsed.Partition,
+				"message %s routed to wrong partition", id,
+			)

 			collected[id] = handle
 		}
 	}

Also applies to: 131-139

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/sqs_partitioned_dispatch_test.go` around lines 91 - 106, The test
currently only asserts parsed.Partition is within a range which can hide
misrouting; instead, after populating sent (the map from group -> MessageId)
reverse that map to look up group by MessageId and assert that for each received
handle the decoded parsed.Partition equals partitionFor(meta, group).
Concretely: after sending messages with callSQS (using MessageGroupId) and
collecting sent[g]=msgID, when you decode handles check the MessageId from the
handle, find the original group via the reversed sent map, then replace the
loose parsed.Partition < 4 assertion with require.Equal(t, partitionFor(meta,
group), parsed.Partition) so each message is asserted to land on its expected
partition.

Comment on lines +160 to +191
// Queue must now be empty even after the visibility timeout
// would have re-exposed it.
status, out = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
"QueueUrl": queueURL,
"MaxNumberOfMessages": 10,
})
require.Equal(t, http.StatusOK, status, "post-delete receive: %v", out)
if msgs, _ := out["Messages"].([]any); len(msgs) > 0 {
t.Fatalf("expected empty queue after delete; got %d messages", len(msgs))
}

// Sanity: the legacy keyspace must be empty (every send on
// this queue went to the partitioned keyspace, never the
// legacy one). We probe the legacy data prefix at this
// queue's generation: it must yield zero entries.
ctx := context.Background()
readTS := node.sqsServer.nextTxnReadTS(ctx)
meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS)
require.NoError(t, err)
require.True(t, exists)
legacyDataPrefix := sqsMsgDataKey(queueName, meta.Generation, "")
// Cap the prefix scan at the generation byte so we do not
// drag in unrelated queues.
end := append([]byte(nil), legacyDataPrefix...)
end = append(end, 0xFF, 0xFF, 0xFF, 0xFF)
page, err := node.sqsServer.store.ScanAt(ctx, legacyDataPrefix, end, 32, readTS)
require.NoError(t, err)
for _, kvp := range page {
require.False(t, strings.HasPrefix(string(kvp.Key), string(legacyDataPrefix)),
"legacy data key found on a partitioned queue (key=%q) — wiring leaked into the wrong keyspace",
string(kvp.Key))
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

The post-delete emptiness check can still pass with undeleted invisible messages.

All earlier receives use VisibilityTimeout: 60, but this assertion re-receives immediately. If DeleteMessage regresses into “leave the record invisible but not removed,” this block still passes. The legacy-prefix probe below does not close that gap because it never inspects the partitioned keyspace.

Stronger options
-	// Queue must now be empty even after the visibility timeout
-	// would have re-exposed it.
+	// Queue must now be empty. To avoid a false pass from in-flight
+	// visibility, either wait past a short timeout or inspect the
+	// partitioned keyspace directly.
-			"VisibilityTimeout":   60,
+			"VisibilityTimeout":   1,
+	time.Sleep(1100 * time.Millisecond)
 	status, out = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
 		"QueueUrl":            queueURL,
 		"MaxNumberOfMessages": 10,
 	})

Or, better for determinism, scan the partitioned data/visibility prefixes and assert they are empty after the deletes.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Queue must now be empty even after the visibility timeout
// would have re-exposed it.
status, out = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
"QueueUrl": queueURL,
"MaxNumberOfMessages": 10,
})
require.Equal(t, http.StatusOK, status, "post-delete receive: %v", out)
if msgs, _ := out["Messages"].([]any); len(msgs) > 0 {
t.Fatalf("expected empty queue after delete; got %d messages", len(msgs))
}
// Sanity: the legacy keyspace must be empty (every send on
// this queue went to the partitioned keyspace, never the
// legacy one). We probe the legacy data prefix at this
// queue's generation: it must yield zero entries.
ctx := context.Background()
readTS := node.sqsServer.nextTxnReadTS(ctx)
meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS)
require.NoError(t, err)
require.True(t, exists)
legacyDataPrefix := sqsMsgDataKey(queueName, meta.Generation, "")
// Cap the prefix scan at the generation byte so we do not
// drag in unrelated queues.
end := append([]byte(nil), legacyDataPrefix...)
end = append(end, 0xFF, 0xFF, 0xFF, 0xFF)
page, err := node.sqsServer.store.ScanAt(ctx, legacyDataPrefix, end, 32, readTS)
require.NoError(t, err)
for _, kvp := range page {
require.False(t, strings.HasPrefix(string(kvp.Key), string(legacyDataPrefix)),
"legacy data key found on a partitioned queue (key=%q) — wiring leaked into the wrong keyspace",
string(kvp.Key))
}
// Queue must now be empty. To avoid a false pass from in-flight
// visibility, either wait past a short timeout or inspect the
// partitioned keyspace directly.
status, out = callSQS(t, node, sqsReceiveMessageTarget, map[string]any{
"QueueUrl": queueURL,
"MaxNumberOfMessages": 10,
})
require.Equal(t, http.StatusOK, status, "post-delete receive: %v", out)
if msgs, _ := out["Messages"].([]any); len(msgs) > 0 {
t.Fatalf("expected empty queue after delete; got %d messages", len(msgs))
}
// Sanity: the legacy keyspace must be empty (every send on
// this queue went to the partitioned keyspace, never the
// legacy one). We probe the legacy data prefix at this
// queue's generation: it must yield zero entries.
ctx := context.Background()
readTS := node.sqsServer.nextTxnReadTS(ctx)
meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS)
require.NoError(t, err)
require.True(t, exists)
legacyDataPrefix := sqsMsgDataKey(queueName, meta.Generation, "")
// Cap the prefix scan at the generation byte so we do not
// drag in unrelated queues.
end := append([]byte(nil), legacyDataPrefix...)
end = append(end, 0xFF, 0xFF, 0xFF, 0xFF)
page, err := node.sqsServer.store.ScanAt(ctx, legacyDataPrefix, end, 32, readTS)
require.NoError(t, err)
for _, kvp := range page {
require.False(t, strings.HasPrefix(string(kvp.Key), string(legacyDataPrefix)),
"legacy data key found on a partitioned queue (key=%q) — wiring leaked into the wrong keyspace",
string(kvp.Key))
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/sqs_partitioned_dispatch_test.go` around lines 160 - 191, The current
post-delete check re-calls callSQS(sqsReceiveMessageTarget) immediately and can
miss undeleted but invisible messages (earlier receives used VisibilityTimeout:
60), so a regression that leaves records invisible would still pass; instead of
relying on immediate receive, either wait > VisibilityTimeout before
re-receiving or (preferred) directly assert the partitioned keyspace is empty by
scanning the partitioned data and visibility prefixes with
node.sqsServer.store.ScanAt using the queue generation from
node.sqsServer.loadQueueMetaAt (you already use sqsMsgDataKey for the legacy
prefix—compute the analogous partitioned data/visibility prefixes for this queue
generation and assert ScanAt returns zero entries), keeping the existing
legacy-prefix check for completeness.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant